Skip to content

[SPARK-17556][SQL] Executor side broadcast for broadcast joins#15178

Closed
viirya wants to merge 12 commits into
apache:masterfrom
viirya:broadcast-on-executors
Closed

[SPARK-17556][SQL] Executor side broadcast for broadcast joins#15178
viirya wants to merge 12 commits into
apache:masterfrom
viirya:broadcast-on-executors

Conversation

@viirya

@viirya viirya commented Sep 21, 2016

Copy link
Copy Markdown
Member

What changes were proposed in this pull request?

The mechanism of broadcast in Spark is to collect the result of an RDD and then broadcast it. This introduces some extra latency. We can broadcast the RDD directly from executors. This patch implements broadcast from executors, and applies it on broadcast join of Spark SQL.

The advantages of executor-size broadcast:

  • The data of RDD doesn't need to collect to the driver before broadcasting
  • The driver isn't the bottleneck of data transmission at the beginning of broadcasting

Design document: https://issues.apache.org/jira/secure/attachment/12831201/executor-side-broadcast.pdf

Major API changes

  • New API broadcastRDDOnExecutor in SparkContext

    It takes two parameters rdd: RDD[T] and mode: BroadcastMode[T]. It will broadcast the content of the rdd between executors without collecting it back to the driver. mode is used to convert the content of the rdd to the broadcasted object.

    Besides T, this API has another type parameter U, which is the type of the converted object.

  • New Broadcast implementation TorrentExecutorBroadcast

    Different to TorrentBroadcast, this implementation doesn't divide and store object data waiting to broadcast in the driver. The executors use local and remote fetches to fetch the blocks of the RDD and convert the rdd content to broadcasted object.

  • BroadcastMode is moved from org.apache.spark.sql.catalyst.plans.physical to org.apache.spark.broadcast

    It is added a type parameter T now which is the converted type of the broadcasted object on executors.

Usage: How to use executor side broadcast

To broadcast the result of a RDD, instead of collecting the result back to the driver and broadcasting it, we can use executor side broadcast feature proposed in this proposal.

  1. Prepare the RDD to be broadcast

    // To broadcast the RDD on executors,
    // we should materialize and cache the result of the RDD
    val rdd = sc.parallelize(1 to 4, 2).cache()
    rdd.count()
    
  2. Define how to transform the result of the RDD with BroadcastMode

    val mode = new BroadcastMode[Int] {
      override def transform(rows: Array[Int]): Array[Int] = rows
    }
    
  3. Broadcast the RDD and use broadcasted variable

    val broadcastedVal = sc.broadcastRDDOnExecutor[Int, Array[Int]](rdd, mode)
    val collected = sc.parallelize(1 to 2, 2).map { _ =>
      broadcastedVal.value.reduce(_ + _) // 1 + 2 + 3 + 4 = 10
    }.collect()
    assert(collected.sum == 20)
    

How was this patch tested?

Jenkins tests.

Loading
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants